<!DOCTYPE html>
<html>
  <head><meta name="generator" content="Hexo 3.8.0">
  <meta http-equiv="content-type" content="text/html; charset=utf-8">
  <meta content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=0" name="viewport">
  <meta name="description" content="fengzhaoyang&#39;s blog">
  <meta name="keyword" content="hexo-theme, vuejs">
  
  <link rel="shortcut icon" href="/css/images/logo.png">
  
  <title>
    
    java异步计算场景应用 | fzy-blog
    
  </title>
  <link href="//cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css" rel="stylesheet">
  <link href="//cdnjs.cloudflare.com/ajax/libs/nprogress/0.2.0/nprogress.min.css" rel="stylesheet">
  <link href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/styles/tomorrow.min.css" rel="stylesheet">
  <link rel="stylesheet" href="/css/style.css">
  
  <script src="//cdnjs.cloudflare.com/ajax/libs/jquery/3.2.1/jquery.min.js"></script>
  <script src="//cdnjs.cloudflare.com/ajax/libs/geopattern/1.2.3/js/geopattern.min.js"></script>
  <script src="//cdnjs.cloudflare.com/ajax/libs/nprogress/0.2.0/nprogress.min.js"></script>
  
  <script src="/js/qrious.js"></script>
  
  
  
  
    <!-- MathJax support START -->
    <script type="text/x-mathjax-config">
      MathJax.Hub.Config({
        tex2jax: {
          inlineMath: [ ['$','$'], ["\\(","\\)"]  ],
          processEscapes: true,
          skipTags: ['script', 'noscript', 'style', 'textarea', 'pre', 'code']
        }
      });
    </script>

    <script type="text/x-mathjax-config">
      MathJax.Hub.Queue(function() {
        var all = MathJax.Hub.getAllJax(), i;
        for (i=0; i < all.length; i += 1) {
          all[i].SourceElement().parentNode.className += ' has-jax';
        }
      });
    </script>
    <script type="text/javascript" src="//cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.5/MathJax.js?config=TeX-AMS-MML_HTMLorMML"></script>
    <!-- MathJax support END -->
  


</head>
<div class="wechat-share">
  <img src="/css/images/logo.png">
</div>
  <body>
    <header class="header fixed-header">
  <div class="header-container">
    <a class="home-link" href="/">
      <div class="logo"></div>
      <span>fzy-blog</span>
    </a>
    <ul class="right-list">
      
        <li class="list-item">
          
            <a href="/" class="item-link">Home</a>
          
        </li>
      
        <li class="list-item">
          
            <a href="/tags/" class="item-link">Tags</a>
          
        </li>
      
        <li class="list-item">
          
            <a href="/archives/" class="item-link">Archives</a>
          
        </li>
      
        <li class="list-item">
          
            <a href="/project/" class="item-link">Projects</a>
          
        </li>
      
        <li class="list-item">
          
            <a href="/about/" class="item-link">About</a>
          
        </li>
      
    </ul>
    <div class="menu">
      <span class="icon-bar"></span>
      <span class="icon-bar"></span>
      <span class="icon-bar"></span>
    </div>
    <div class="menu-mask">
      <ul class="menu-list">
        
          <li class="menu-item">
            
              <a href="/" class="menu-link">Home</a>
            
          </li>
        
          <li class="menu-item">
            
              <a href="/tags/" class="menu-link">Tags</a>
            
          </li>
        
          <li class="menu-item">
            
              <a href="/archives/" class="menu-link">Archives</a>
            
          </li>
        
          <li class="menu-item">
            
              <a href="/project/" class="menu-link">Projects</a>
            
          </li>
        
          <li class="menu-item">
            
              <a href="/about/" class="menu-link">About</a>
            
          </li>
        
      </ul>
    </div>
  </div>
</header>

    <div id="article-banner">
  <h2>java异步计算场景应用</h2>
  <p class="post-date">2019-05-24</p>
  <div class="arrow-down">
    <a href="javascript:;"></a>
  </div>
</div>
<main class="app-body flex-box">
  <!-- Article START -->
  <article class="post-article">
    <section class="markdown-content"><p>最近项目中遇到一个业务场景：<br>将当期数据库中的表迁移到另外一个数据库中，为满足迁移效率需要进行并发数据迁移。对每一数据表可以启动不同的线程同时迁移数据。迁移完成后，同步更新对应该迁移任务的状态字段。<br>最先想到的是使用 java 中并发工具类：同步屏障 CyclicBarrier。<br>CyclicBarrier 的字面意思是可循环使用（Cyclic）的屏障（Barrier）。它要做的事情是，让一组线程到达一个屏障（也可以叫同步点）时被阻塞，直到最后一个线程到达屏障时，屏障才会开门，所有被屏障拦截的线程才会继续运行。<br>CyclicBarrier 可以用于多线程计算数据，最后合并计算结果的场景。<br>一、通过 CyclicBarrier 实现迁移任务代码：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">package</span> com.future.test;</span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> java.util.List;</span><br><span class="line"><span class="keyword">import</span> java.util.concurrent.CyclicBarrier;</span><br><span class="line"><span class="keyword">import</span> java.util.concurrent.ExecutorService;</span><br><span class="line"><span class="keyword">import</span> java.util.concurrent.Executors;</span><br><span class="line"></span><br><span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">class</span> <span class="title">VerticaTransfer</span> <span class="keyword">extends</span> <span class="title">DataTransfer</span>&lt;<span class="title">DataInfo</span>&gt;</span>&#123;</span><br><span class="line">    <span class="keyword">int</span> threadCount = <span class="number">10</span>;</span><br><span class="line">    <span class="comment">//线程调度</span></span><br><span class="line">    ExecutorService executor = <span class="keyword">null</span>;</span><br><span class="line">    CyclicBarrier barrier;</span><br><span class="line">    <span class="comment">//计算结果集</span></span><br><span class="line">    <span class="function"><span class="keyword">protected</span> <span class="keyword">void</span> <span class="title">doBefore</span><span class="params">(DataInfo entity)</span></span>&#123;</span><br><span class="line">        <span class="comment">//线程池</span></span><br><span class="line">        executor = Executors.newFixedThreadPool(threadCount);</span><br><span class="line">        <span class="comment">//CyclicBarrier可以用于多线程计算数据，最后处理结果的场景</span></span><br><span class="line">        barrier = <span class="keyword">new</span> CyclicBarrier(threadCount,<span class="keyword">new</span> DoAfter(<span class="keyword">this</span>,entity));</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="function"><span class="keyword">protected</span> <span class="keyword">void</span> <span class="title">doJob</span><span class="params">(DataInfo entity)</span></span>&#123;</span><br><span class="line">        <span class="comment">//并行计算</span></span><br><span class="line">        List&lt;Product&gt; ps = entity.getProducts();</span><br><span class="line">        <span class="keyword">for</span> (Product product : ps) &#123;</span><br><span class="line">            executor.execute(<span class="keyword">new</span> VerticaTransferTask(barrier,product));</span><br><span class="line">        &#125;</span><br><span class="line"></span><br><span class="line">    &#125;</span><br><span class="line">    <span class="meta">@Override</span></span><br><span class="line">    <span class="function"><span class="keyword">protected</span> <span class="keyword">void</span> <span class="title">doAfter</span><span class="params">(DataInfo entity)</span> </span>&#123;</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">&#125;</span><br><span class="line"><span class="comment">/**</span></span><br><span class="line"><span class="comment"> * 合并计算处理</span></span><br><span class="line"><span class="comment"> * <span class="doctag">@author</span> Administrator</span></span><br><span class="line"><span class="comment"> *</span></span><br><span class="line"><span class="comment"> */</span></span><br><span class="line"><span class="class"><span class="keyword">class</span> <span class="title">DoAfter</span> <span class="keyword">implements</span> <span class="title">Runnable</span> </span>&#123;</span><br><span class="line">    <span class="keyword">private</span> VerticaTransfer verticaTransfer;</span><br><span class="line">    <span class="keyword">private</span> DataInfo entity;</span><br><span class="line">    DoAfter(VerticaTransfer verticaTransfer,DataInfo entity) &#123;</span><br><span class="line">        <span class="keyword">this</span>.verticaTransfer = verticaTransfer;</span><br><span class="line">        <span class="keyword">this</span>.entity = entity;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">run</span><span class="params">()</span> </span>&#123;</span><br><span class="line">        System.out.println(<span class="string">"迁移完成。共迁移："</span> + entity.getProducts().size());</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>业务处理代码：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">package</span> com.future.test;</span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> java.util.concurrent.BrokenBarrierException;</span><br><span class="line"><span class="keyword">import</span> java.util.concurrent.CyclicBarrier;</span><br><span class="line"></span><br><span class="line"><span class="comment">/**</span></span><br><span class="line"><span class="comment"> * 数据迁移执行任务</span></span><br><span class="line"><span class="comment"> * <span class="doctag">@author</span></span></span><br><span class="line"><span class="comment"> *</span></span><br><span class="line"><span class="comment"> */</span></span><br><span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">class</span> <span class="title">VerticaTransferTask</span> <span class="keyword">implements</span> <span class="title">Runnable</span></span>&#123;</span><br><span class="line"></span><br><span class="line">    <span class="keyword">private</span> CyclicBarrier barrier;</span><br><span class="line">    <span class="keyword">private</span> Product product;</span><br><span class="line">    VerticaTransferTask(Product product)&#123;</span><br><span class="line">        <span class="keyword">this</span>.product = product;</span><br><span class="line">    &#125;</span><br><span class="line">    VerticaTransferTask(CyclicBarrier barrier,Product product)&#123;</span><br><span class="line">        <span class="keyword">this</span>.barrier = barrier;</span><br><span class="line">        <span class="keyword">this</span>.product = product;</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line"></span><br><span class="line">    <span class="meta">@Override</span></span><br><span class="line">    <span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">run</span><span class="params">()</span> </span>&#123;</span><br><span class="line">        <span class="comment">// TODO Auto-generated method stub</span></span><br><span class="line">        <span class="keyword">try</span> &#123;</span><br><span class="line">            System.out.println(<span class="string">"进行迁移 :"</span> + product.getId());</span><br><span class="line">            Thread.sleep(<span class="number">1000</span>);</span><br><span class="line">        &#125;<span class="keyword">catch</span>(Exception e)&#123;</span><br><span class="line">            e.printStackTrace();</span><br><span class="line">        &#125; <span class="keyword">finally</span> &#123;</span><br><span class="line">            <span class="keyword">try</span> &#123;</span><br><span class="line">                barrier.await();</span><br><span class="line">            &#125; <span class="keyword">catch</span> (InterruptedException e) &#123;</span><br><span class="line">                <span class="comment">// TODO Auto-generated catch block</span></span><br><span class="line">                e.printStackTrace();</span><br><span class="line">            &#125; <span class="keyword">catch</span> (BrokenBarrierException e) &#123;</span><br><span class="line">                <span class="comment">// TODO Auto-generated catch block</span></span><br><span class="line">                e.printStackTrace();</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>测试入口：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br></pre></td><td class="code"><pre><span class="line"></span><br><span class="line"><span class="keyword">package</span> com.future.test;</span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> java.util.ArrayList;</span><br><span class="line"><span class="keyword">import</span> java.util.List;</span><br><span class="line"></span><br><span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">class</span> <span class="title">VerticaTransferTest</span></span>&#123;</span><br><span class="line">    <span class="function"><span class="keyword">public</span> <span class="keyword">static</span> <span class="keyword">void</span> <span class="title">main</span><span class="params">(String[] args)</span> </span>&#123;</span><br><span class="line">            VerticaTransfer transfer = <span class="keyword">new</span> VerticaTransfer(); <span class="comment">//</span></span><br><span class="line">            DataInfo data = <span class="keyword">new</span> DataInfo();</span><br><span class="line">            List&lt;Product&gt; ps = <span class="keyword">new</span> ArrayList&lt;Product&gt;();</span><br><span class="line">            <span class="keyword">int</span> tmp = <span class="number">0</span>;</span><br><span class="line">            <span class="keyword">for</span>(<span class="keyword">int</span> i = <span class="number">0</span>; i &lt; <span class="number">10</span>;i++)&#123;</span><br><span class="line">                Product p = <span class="keyword">new</span> Product();</span><br><span class="line">                p.setId(i + <span class="string">""</span>);</span><br><span class="line">                p.setPurchase_price(<span class="number">10</span>);</span><br><span class="line">                p.setSalse_price(<span class="number">10</span> + i);</span><br><span class="line">                ps.add(p);</span><br><span class="line">                tmp += i;</span><br><span class="line">            &#125;</span><br><span class="line"></span><br><span class="line">            data.setProducts(ps);</span><br><span class="line">            transfer.execute(data);</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>通过上述实现步骤，完全可以实现业务场景。<br>增强业务场景：在上述场景基础上，对每次迁移的结果进行最终的汇总。多少迁移成功，多少迁移失败。也就是对每个线程处理结果进行汇总。<br>这个就涉及到线程间通信的问题。在现有处理的基础上，添加一个公共 List 变量，在迁移 VerticaTransferTask run()方法中将迁移结果 synchronized 放在 List<br>中即可。<br>但是，有没有更好的实现方式呢？<br>Future 接口<br>描述：从 Java 1.5 开始，提供了 Callable 和 Future，通过它们可以在任务执行完毕之后得到任务执行结果。<br>这就表示我们可以通过 Future 获取每个线程的执行结果。我以下通过并行计算产品利润的方式简单实现需求。<br>二、通过 Future 实现并行处理任务代码：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br></pre></td><td class="code"><pre><span class="line"></span><br><span class="line"><span class="keyword">package</span> com.test;</span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> java.util.ArrayList;</span><br><span class="line"><span class="keyword">import</span> java.util.List;</span><br><span class="line"><span class="keyword">import</span> java.util.concurrent.CyclicBarrier;</span><br><span class="line"><span class="keyword">import</span> java.util.concurrent.ExecutionException;</span><br><span class="line"><span class="keyword">import</span> java.util.concurrent.ExecutorService;</span><br><span class="line"><span class="keyword">import</span> java.util.concurrent.Executors;</span><br><span class="line"><span class="keyword">import</span> java.util.concurrent.Future;</span><br><span class="line"></span><br><span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">class</span> <span class="title">VerticaTransfer</span> <span class="keyword">extends</span> <span class="title">DataTransfer</span>&lt;<span class="title">DataInfo</span>&gt;</span>&#123;</span><br><span class="line">    <span class="keyword">int</span> threadCount = <span class="number">10</span>;</span><br><span class="line">    <span class="comment">//线程调度</span></span><br><span class="line">    ExecutorService executor = <span class="keyword">null</span>;</span><br><span class="line">    <span class="comment">//计算结果集</span></span><br><span class="line">    List&lt;Future&lt;ResultInfo&gt;&gt; results = <span class="keyword">new</span> ArrayList&lt;Future&lt;ResultInfo&gt;&gt;();</span><br><span class="line">    <span class="function"><span class="keyword">protected</span> <span class="keyword">void</span> <span class="title">doBefore</span><span class="params">(DataInfo entity)</span></span>&#123;</span><br><span class="line">        <span class="comment">//线程池</span></span><br><span class="line">        executor = Executors.newFixedThreadPool(threadCount);</span><br><span class="line"></span><br><span class="line">    &#125;</span><br><span class="line">    <span class="function"><span class="keyword">protected</span> <span class="keyword">void</span> <span class="title">doJob</span><span class="params">(DataInfo entity)</span></span>&#123;</span><br><span class="line">        <span class="comment">//并行计算</span></span><br><span class="line">        List&lt;Product&gt; ps = entity.getProducts();</span><br><span class="line">        <span class="keyword">for</span> (Product product : ps) &#123;</span><br><span class="line">            Future&lt;ResultInfo&gt; res = executor.submit(<span class="keyword">new</span> VerticaTransferTask(product));</span><br><span class="line">            results.add(res);</span><br><span class="line">        &#125;</span><br><span class="line"></span><br><span class="line">    &#125;</span><br><span class="line">    <span class="meta">@Override</span></span><br><span class="line">    <span class="function"><span class="keyword">protected</span> <span class="keyword">void</span> <span class="title">doAfter</span><span class="params">(DataInfo entity)</span> </span>&#123;</span><br><span class="line">        <span class="keyword">double</span> total = <span class="number">0</span>;</span><br><span class="line">        List&lt;Future&lt;ResultInfo&gt;&gt; rs = <span class="keyword">this</span>.results;</span><br><span class="line">        <span class="keyword">for</span> (Future&lt;ResultInfo&gt; future : rs) &#123;</span><br><span class="line">            <span class="keyword">try</span> &#123;</span><br><span class="line">                ResultInfo info = future.get();</span><br><span class="line">                total += info.getPrice();</span><br><span class="line">            &#125; <span class="keyword">catch</span> (InterruptedException e) &#123;</span><br><span class="line">                <span class="comment">// TODO Auto-generated catch block</span></span><br><span class="line">                e.printStackTrace();</span><br><span class="line">            &#125; <span class="keyword">catch</span> (ExecutionException e) &#123;</span><br><span class="line">                <span class="comment">// TODO Auto-generated catch block</span></span><br><span class="line">                e.printStackTrace();</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;</span><br><span class="line">        System.out.println(<span class="string">"产品总利润："</span> + total);</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>业务处理代码：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br></pre></td><td class="code"><pre><span class="line"></span><br><span class="line"><span class="keyword">package</span> com.test;</span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> java.util.concurrent.Callable;</span><br><span class="line"></span><br><span class="line"><span class="comment">/**</span></span><br><span class="line"><span class="comment"> * 数据迁移执行任务</span></span><br><span class="line"><span class="comment"> * <span class="doctag">@author</span></span></span><br><span class="line"><span class="comment"> *</span></span><br><span class="line"><span class="comment"> */</span></span><br><span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">class</span> <span class="title">VerticaTransferTask</span> <span class="keyword">implements</span> <span class="title">Callable</span>&lt;<span class="title">ResultInfo</span>&gt;</span>&#123;</span><br><span class="line"></span><br><span class="line">    <span class="keyword">private</span> Product product;</span><br><span class="line">    VerticaTransferTask(Product product)&#123;</span><br><span class="line">        <span class="keyword">this</span>.product = product;</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line"></span><br><span class="line">    <span class="meta">@Override</span></span><br><span class="line">    <span class="function"><span class="keyword">public</span> ResultInfo <span class="title">call</span><span class="params">()</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">        <span class="comment">// TODO Auto-generated method stub</span></span><br><span class="line">        ResultInfo res = <span class="keyword">null</span>;</span><br><span class="line">        <span class="keyword">try</span> &#123;</span><br><span class="line">            <span class="keyword">double</span> money = product.getSalse_price() - product.getPurchase_price();</span><br><span class="line">            res = <span class="keyword">new</span> ResultInfo();</span><br><span class="line">            res.setPrice(money);</span><br><span class="line">            res.setProductId(product.getId());</span><br><span class="line">            Thread.sleep(<span class="number">1000</span>);</span><br><span class="line">        &#125;<span class="keyword">catch</span>(Exception e)&#123;</span><br><span class="line">            e.printStackTrace();</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="keyword">return</span> res;</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>很简单我们就实现了并行计算并合并结果集。<br>那我能不能两者一起使用呢，我在 CyclicBarrier 处理结果 DoAfter 类中获取 Future 结果进行统计。<br>这样不就可以满足需求了吗。设想处理如下：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br><span class="line">53</span><br><span class="line">54</span><br><span class="line">55</span><br><span class="line">56</span><br><span class="line">57</span><br><span class="line">58</span><br><span class="line">59</span><br></pre></td><td class="code"><pre><span class="line"></span><br><span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">class</span> <span class="title">VerticaTransfer</span> <span class="keyword">extends</span> <span class="title">DataTransfer</span>&lt;<span class="title">DataInfo</span>&gt;</span>&#123;</span><br><span class="line">    <span class="keyword">int</span> threadCount = <span class="number">10</span>;</span><br><span class="line">    <span class="comment">//线程调度</span></span><br><span class="line">    ExecutorService executor = <span class="keyword">null</span>;</span><br><span class="line">    CyclicBarrier barrier;</span><br><span class="line">    <span class="comment">//计算结果集</span></span><br><span class="line">    List&lt;Future&lt;ResultInfo&gt;&gt; results = <span class="keyword">new</span> ArrayList&lt;Future&lt;ResultInfo&gt;&gt;();</span><br><span class="line">    <span class="function"><span class="keyword">protected</span> <span class="keyword">void</span> <span class="title">doBefore</span><span class="params">(DataInfo entity)</span></span>&#123;</span><br><span class="line">        <span class="comment">//线程池</span></span><br><span class="line">        executor = Executors.newFixedThreadPool(threadCount);</span><br><span class="line">        <span class="comment">//CyclicBarrier可以用于多线程计算数据，最后处理结果的场景</span></span><br><span class="line">        barrier = <span class="keyword">new</span> CyclicBarrier(threadCount,<span class="keyword">new</span> DoAfter(<span class="keyword">this</span>,entity));</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="function"><span class="keyword">protected</span> <span class="keyword">void</span> <span class="title">doJob</span><span class="params">(DataInfo entity)</span></span>&#123;</span><br><span class="line">        <span class="comment">//并行计算</span></span><br><span class="line">        List&lt;Product&gt; ps = entity.getProducts();</span><br><span class="line">        <span class="keyword">for</span> (Product product : ps) &#123;</span><br><span class="line">            Future&lt;ResultInfo&gt; res = executor.submit(<span class="keyword">new</span> VerticaTransferTask(product));</span><br><span class="line">            results.add(res);</span><br><span class="line">        &#125;</span><br><span class="line"></span><br><span class="line">    &#125;</span><br><span class="line">    <span class="meta">@Override</span></span><br><span class="line">    <span class="function"><span class="keyword">protected</span> <span class="keyword">void</span> <span class="title">doAfter</span><span class="params">(DataInfo entity)</span> </span>&#123;</span><br><span class="line"></span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line">&#125;</span><br><span class="line"><span class="comment">/**</span></span><br><span class="line"><span class="comment"> * 合并计算处理</span></span><br><span class="line"><span class="comment"> * <span class="doctag">@author</span> Administrator</span></span><br><span class="line"><span class="comment"> *</span></span><br><span class="line"><span class="comment"> */</span></span><br><span class="line"><span class="class"><span class="keyword">class</span> <span class="title">DoAfter</span> <span class="keyword">implements</span> <span class="title">Runnable</span> </span>&#123;</span><br><span class="line">    <span class="keyword">private</span> VerticaTransfer verticaTransfer;</span><br><span class="line">    <span class="keyword">private</span> DataInfo entity;</span><br><span class="line">    DoAfter(VerticaTransfer verticaTransfer,DataInfo entity) &#123;</span><br><span class="line">        <span class="keyword">this</span>.verticaTransfer = verticaTransfer;</span><br><span class="line">        <span class="keyword">this</span>.entity = entity;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">run</span><span class="params">()</span> </span>&#123;</span><br><span class="line">        <span class="keyword">double</span> total = <span class="number">0</span>;</span><br><span class="line">        List&lt;Future&lt;ResultInfo&gt;&gt; rs = verticaTransfer.results;</span><br><span class="line">        <span class="keyword">for</span> (Future&lt;ResultInfo&gt; future : rs) &#123;</span><br><span class="line">            <span class="keyword">try</span> &#123;</span><br><span class="line">                ResultInfo info = future.get();</span><br><span class="line">                total += info.getPrice();</span><br><span class="line">            &#125; <span class="keyword">catch</span> (InterruptedException e) &#123;</span><br><span class="line">                <span class="comment">// TODO Auto-generated catch block</span></span><br><span class="line">                e.printStackTrace();</span><br><span class="line">            &#125; <span class="keyword">catch</span> (ExecutionException e) &#123;</span><br><span class="line">                <span class="comment">// TODO Auto-generated catch block</span></span><br><span class="line">                e.printStackTrace();</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;</span><br><span class="line">        System.out.println(<span class="string">"产品总利润："</span> + total);</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>业务处理 VerticaTransferTask：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br></pre></td><td class="code"><pre><span class="line"></span><br><span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">class</span> <span class="title">VerticaTransferTask</span> <span class="keyword">implements</span> <span class="title">Callable</span>&lt;<span class="title">ResultInfo</span>&gt;</span>&#123;</span><br><span class="line"></span><br><span class="line">    <span class="keyword">private</span> CyclicBarrier barrier;</span><br><span class="line">    <span class="keyword">private</span> Product product;</span><br><span class="line">    VerticaTransferTask(Product product)&#123;</span><br><span class="line">        <span class="keyword">this</span>.product = product;</span><br><span class="line">    &#125;</span><br><span class="line">    VerticaTransferTask(CyclicBarrier barrier,Product product)&#123;</span><br><span class="line">        <span class="keyword">this</span>.barrier = barrier;</span><br><span class="line">        <span class="keyword">this</span>.product = product;</span><br><span class="line">    &#125;</span><br><span class="line"></span><br><span class="line"></span><br><span class="line">    <span class="meta">@Override</span></span><br><span class="line">    <span class="function"><span class="keyword">public</span> ResultInfo <span class="title">call</span><span class="params">()</span> </span>&#123;</span><br><span class="line">        <span class="comment">// TODO Auto-generated method stub</span></span><br><span class="line">        ResultInfo res = <span class="keyword">null</span>;</span><br><span class="line">        <span class="keyword">try</span> &#123;</span><br><span class="line">            <span class="keyword">double</span> money = product.getSalse_price() - product.getPurchase_price();</span><br><span class="line">            res = <span class="keyword">new</span> ResultInfo();</span><br><span class="line">            res.setPrice(money);</span><br><span class="line">            res.setProductId(product.getId());</span><br><span class="line">            Thread.sleep(<span class="number">1000</span>);</span><br><span class="line">        &#125;<span class="keyword">catch</span>(Exception e)&#123;</span><br><span class="line">            e.printStackTrace();</span><br><span class="line">        &#125; <span class="keyword">finally</span> &#123;</span><br><span class="line">            <span class="keyword">try</span> &#123;</span><br><span class="line">                barrier.await();</span><br><span class="line">            &#125; <span class="keyword">catch</span> (InterruptedException e) &#123;</span><br><span class="line">                <span class="comment">// TODO Auto-generated catch block</span></span><br><span class="line">                e.printStackTrace();</span><br><span class="line">            &#125; <span class="keyword">catch</span> (BrokenBarrierException e) &#123;</span><br><span class="line">                <span class="comment">// TODO Auto-generated catch block</span></span><br><span class="line">                e.printStackTrace();</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;</span><br><span class="line">        <span class="keyword">return</span> res;</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>运行后发现死锁啦，原因是什么呢？<br>查了一下 CyclicBarrier 资料，注意这一点：<br>CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier（int parties，Runnable barrier-Action），用于在线程到达屏障时，优先执行 barrierAction。<br>也就是在 barrier.await()执行之后会优先执行 DoAfter 类中的 run, 而这时 run 中的 future.get()阻塞等待 VerticaTransferTask call 运行结果，形成了资源相互<br>抢占，造成了死锁。<br>这样我们就大概了解了在 java 中有两种实现并行计算的方式，那么具体遇到问题的时候如何选择呢？<br>我们还是要清楚两者的概念：<br>CyclicBarrier 在到达屏障之后线程并没有处理结束，而是被阻塞等待，等有优先处理 barrierAction 完成后，被 signalAll 唤醒继续运行。<br>CyclicBarrier 中的源代码：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br></pre></td><td class="code"><pre><span class="line"></span><br><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">void</span> <span class="title">nextGeneration</span><span class="params">()</span> </span>&#123;</span><br><span class="line">     <span class="comment">// signal completion of last generation</span></span><br><span class="line">     trip.signalAll();</span><br><span class="line">     <span class="comment">// set up next generation</span></span><br><span class="line">     count = parties;</span><br><span class="line">     generation = <span class="keyword">new</span> Generation();</span><br><span class="line"> &#125;</span><br></pre></td></tr></table></figure>
<p>而 Future 是等待线程运行完成之后才获取结果，否则一直阻塞等待。</p>
<p><a href="http://richard-lee.iteye.com/blog/2276319" target="_blank" rel="noopener">http://richard-lee.iteye.com/blog/2276319</a></p>
</section>
    <!-- Tags START -->
    
    <div class="tags">
      <span>Tags:</span>
      
  <a href="/tags#多线程异步并发">
    <span class="tag-code">多线程异步并发</span>
  </a>

    </div>
    
    <!-- Tags END -->
    <!-- NAV START -->
    
  <div class="nav-container">
    <!-- reverse left and right to put prev and next in a more logic postition -->
    
      <a class="nav-left" href="/2019/05/24/多线程异步并发/java线程池、包括线程的异常处理/">
        <span class="nav-arrow">← </span>
        
          java线程池、包括线程的异常处理
        
      </a>
    
    
      <a class="nav-right" href="/2019/05/24/多线程异步并发/java并发线程池拒绝策略/">
        
          java 并发线程池 拒绝策略
        
        <span class="nav-arrow"> →</span>
      </a>
    
  </div>

    <!-- NAV END -->
    <!-- 打赏 START -->
    
    <div class="money-like">
      <div class="reward-btn">
        赏
        <span class="money-code">
          <span class="alipay-code">
            <div class="code-image"></div>
            <b>使用支付宝打赏</b>
          </span>
          <span class="wechat-code">
            <div class="code-image"></div>
            <b>使用微信打赏</b>
          </span>
        </span>
      </div>
      <p class="notice">若你觉得我的文章对你有帮助，欢迎点击上方按钮对我打赏</p>
    </div>
    
    <!-- 打赏 END -->
    <!-- 二维码 START -->
    
    <div class="qrcode">
      <canvas id="share-qrcode"></canvas>
      <p class="notice">扫描二维码，分享此文章</p>
    </div>
    
    <!-- 二维码 END -->
    
    <!-- No Comment -->
    
  </article>
  <!-- Article END -->
  <!-- Catalog START -->
  
  <aside class="catalog-container">
  <div class="toc-main">
    <strong class="toc-title">Catalog</strong>
    
      <ol class="nav">none</ol>
    
  </div>
</aside>
  
  <!-- Catalog END -->
</main>

<script>
  (function () {
    var url = 'https://fengzhaoy.github.io/2019/05/24/多线程异步并发/java异步计算场景应用/';
    var banner = ''
    if (banner !== '' && banner !== 'undefined' && banner !== 'null') {
      $('#article-banner').css({
        'background-image': 'url(' + banner + ')'
      })
    } else {
      $('#article-banner').geopattern(url)
    }
    $('.header').removeClass('fixed-header')

    // error image
    $(".markdown-content img").on('error', function () {
      $(this).attr('src', 'http://file.muyutech.com/error-img.png')
      $(this).css({
        'cursor': 'default'
      })
    })

    // zoom image
    $(".markdown-content img").on('click', function () {
      var src = $(this).attr('src')
      if (src !== 'http://file.muyutech.com/error-img.png') {
        var imageW = $(this).width()
        var imageH = $(this).height()

        var zoom = ($(window).width() * 0.95 / imageW).toFixed(2)
        zoom = zoom < 1 ? 1 : zoom
        zoom = zoom > 2 ? 2 : zoom
        var transY = (($(window).height() - imageH) / 2).toFixed(2)

        $('body').append('<div class="image-view-wrap"><div class="image-view-inner"><img src="' + src +
          '" /></div></div>')
        $('.image-view-wrap').addClass('wrap-active')
        $('.image-view-wrap img').css({
          'width': `${imageW}`,
          'transform': `translate3d(0, ${transY}px, 0) scale3d(${zoom}, ${zoom}, 1)`
        })
        $('html').css('overflow', 'hidden')

        $('.image-view-wrap').on('click', function () {
          $(this).remove()
          $('html').attr('style', '')
        })
      }
    })
  })();
</script>


<script>
  var qr = new QRious({
    element: document.getElementById('share-qrcode'),
    value: document.location.href
  });
</script>





    <div class="scroll-top">
  <span class="arrow-icon"></span>
</div>
    <footer class="app-footer">
  <p class="copyright">
    &copy; 2019
  </p>
</footer>

<script>
  function async (u, c) {
    var d = document,
      t = 'script',
      o = d.createElement(t),
      s = d.getElementsByTagName(t)[0];
    o.src = u;
    if (c) {
      o.addEventListener('load', function (e) {
        c(null, e);
      }, false);
    }
    s.parentNode.insertBefore(o, s);
  }
</script>
<script>
  async ("//cdnjs.cloudflare.com/ajax/libs/fastclick/1.0.6/fastclick.min.js", function () {
    FastClick.attach(document.body);
  })
</script>

<script>
  var hasLine = 'true';
  async ("//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.12.0/highlight.min.js", function () {
    $('figure pre').each(function (i, block) {
      var figure = $(this).parents('figure');
      if (hasLine === 'false') {
        figure.find('.gutter').hide();
      }
      var lang = figure.attr('class').split(' ')[1] || 'code';
      var codeHtml = $(this).html();
      var codeTag = document.createElement('code');
      codeTag.className = lang;
      codeTag.innerHTML = codeHtml;
      $(this).attr('class', '').empty().html(codeTag);
      figure.attr('data-lang', lang.toUpperCase());
      hljs.highlightBlock(block);
    });
  })
</script>
<!-- Baidu Tongji -->

<script src="/js/script.js"></script>
  </body>
</html>